Java NIO

关于对于 NIO笔记(一)基础内容的总结

若有疑问,去 https://aistudio.google.com/prompts/1gGBqWswXZKAtq8zdF5p8HRi6prHlseRa 查看/提问

NIO 是 Non-blocking I/O(非阻塞 IO)或者 New I/O(新 IO)的缩写。它是 Java 提供的一套用来处理输入输出的 API,与我们之前可能在 Java SE 或 Web 阶段学到的传统 IO(比如 FileInputStream, ServerSocket)不同。

核心区别:

这种非阻塞的特性对于需要同时处理大量连接(比如成千上万个客户端)的网络服务器来说至关重要,因为服务器不能因为等待某一个客户端而卡住,耽误了处理其他客户端。

NIO 主要围绕着三个核心组件构建:Buffer(缓冲区)Channel(通道)Selector(选择器)。我们一个一个来看。


Buffer (缓冲区)

你可以把 Buffer 想象成一个带有额外管理功能的数组,或者一个智能的“桶”。它是 NIO 中用来临时存放数据的主要容器。无论你是要从文件/网络读取数据,还是要把数据写入文件/网络,数据都需要先经过 Buffer 中转。

核心概念详解:

  1. 四个核心变量/指针: 这是理解 Buffer 操作的关键。

    • capacity (容量):Buffer 能容纳的总数据量(单位是它存储的数据类型,比如 IntBuffer 就是 int 的数量)。一旦创建,容量是固定不变的。(桶的总容积)
    • limit (限制):
      • 读模式下(从 Buffer 往外读数据),limit 表示你最多能读到哪个位置,也就是有效数据的末尾。
      • 写模式下(往 Buffer 里写数据),limit 通常等于 capacity,表示你最多可以写到哪个位置
      • 你不能读写超过 limit 的位置。(读模式下:桶里实际的水位线;写模式下:你打算最多把水加到哪里,通常是桶口)
    • position (位置):表示下一个要被读取或写入的元素的位置(索引)。每次调用 get() (读) 或 put() (写) 之后,position 会自动向后移动。(你当前正准备从哪个位置倒水,或者往哪个位置加水)
    • mark (标记):一个临时的书签。你可以调用 mark() 方法记住当前的 position,之后可以通过调用 reset() 方法将 position 恢复到之前标记的位置。(在桶壁上某个水位画个记号)
    • 它们的关系恒定: 0 <= mark <= position <= limit <= capacity
  2. Buffer 的种类与创建:

    • Java 为大部分基本数据类型(除了 boolean)都提供了对应的 Buffer 类:ByteBuffer, CharBuffer, IntBuffer, ShortBuffer, LongBuffer, FloatBuffer, DoubleBuffer。其中 ByteBuffer 是最常用的,因为文件和网络传输的底层数据通常是字节。
    • 创建方式 (不用 new):
      • Buffer类名.allocate(capacity):创建一个指定容量的 Buffer。这个 Buffer 的数据实际存储在 Java 堆内存的一个数组里。这被称为“堆缓冲区”(Heap Buffer)。(造一个新的空桶)
      • Buffer类名.wrap(现有数组):用一个已经存在的数组来创建一个 Buffer。这个 Buffer 直接使用这个数组来存数据,会复制一份。所以,修改 Buffer 里的数据,会直接改变原数组;反之亦然。(直接拿一个现成的装满东西的数组,给它套上 Buffer 的管理功能)
  3. 写入数据 (put 操作): 往 Buffer 里放数据。

    • buffer.put(数据):在当前的 position 位置放入数据,然后 position 加 1。
    • buffer.put(索引, 数据):在指定索引 位置放入数据,不会改变当前的 position
    • buffer.put(数组)buffer.put(数组, offset, length):将源数组中的数据(全部或部分)批量复制到 Buffer 中,从 Buffer 的当前 position 开始放,并相应地移动 position
    • buffer.put(另一个Buffer):将另一个 Buffer 中剩余未读的数据复制到当前 Buffer 中。
  4. 至关重要的 flip() 方法:

    • 为什么需要它? 当你用 put 方法向 Buffer 中写完数据后,position 指向的是你刚写入数据的后面,而 limit 很可能还是等于 capacity。这时,如果你想读取你刚刚写入的数据,你需要做两件事:
      1. 告诉 Buffer 读取操作应该在哪里结束,也就是你实际写入数据的末尾。这个末尾就是你写完数据后的 position
      2. 将读取的起始位置设置回 Buffer 的开头(索引 0)。
    • flip() 做了什么? 它正是帮你完成这两件事:limit = position; position = 0; mark = -1; (把 limit 设置为当前 position,再把 position 归零,同时清除 mark 标记)。
    • 比喻: 你往桶里加水(put),加到一定水位后停下(此时 position 在水位线)。现在你想把桶里的水倒出来(get)。flip() 操作就像是:1. 在当前水位线那里给桶加了个盖子(limit = position),防止倒水时超过这个水位。2. 把倒水的起点设置回桶底(position = 0)。
  5. 读取数据 (get 操作): 从 Buffer 里取数据。

    • buffer.get():读取当前 position 位置的数据,然后 position 加 1。
    • buffer.get(索引):读取指定 索引 位置的数据,不会改变当前的 position
    • buffer.get(目标数组)buffer.get(目标数组, offset, length):将 Buffer 中的数据(从当前 position 开始)批量复制到目标数组中,并相应地移动 position
    • remaining():返回 limit - position,告诉你还有多少数据可以读取
    • hasRemaining():检查是否还有数据可读 (position < limit)。经常用在循环中:while(buffer.hasRemaining()) { 处理(buffer.get()); }
  6. 其他常用方法:

    • rewind()position = 0; mark = -1;limit 不变。让你能重新从头读取 Buffer 中的数据(前提是你之前已经 flip() 过了)。(把倒水的起点移回桶底,准备重新倒一遍)
    • clear()position = 0; limit = capacity; mark = -1;。让 Buffer 准备好重新写入数据。它并不会真的清除 Buffer 里的旧数据,只是把指针复位了,后续写入会覆盖旧数据。(把桶“清空”——其实水还在,但把水位标记和倒水位置都重置了,看起来像空桶一样,可以重新加水了)
    • compact():将所有未读的数据(从 positionlimit 之间的数据)移动到 Buffer 的开头。然后,position 设置为这些移动后数据的末尾,limit 设置为 capacity。这在你读取了一部分数据,又想继续往 Buffer 里写新数据,但不想覆盖掉还没读的那部分数据时很有用。(把桶里没倒完的水,都集中到底部,然后把加水的位置设在这些水之上,准备继续加水)
    • mark() & reset():前面提过,设置和恢复书签。
    • duplicate():创建一个新的 Buffer 对象,但这个新 Buffer 和原始 Buffer 共享同一个底层数组(或者同一块内存区域)。它们各自拥有独立的 position, limit, mark 状态。但是,如果你通过一个 Buffer 修改了数据(例如 buffer1.put(0, 100)),那么通过另一个 Buffer 读取相同位置的数据(例如 buffer2.get(0))会看到这个修改。(像是给同一个桶,贴了两套不同的水位标记和读写位置指示器,但操作的都是同一个桶里的水)
    • slice():创建一个新的 Buffer,它代表原始 Buffer 的一个子序列(从原始 Buffer 的 positionlimit)。它也共享底层数据,但拥有自己独立的、从 0 开始计算的 position, limit, mark。(像是从大桶里划出一块区域,给这块区域单独一套标记,但操作的还是大桶里那部分水)
  7. 只读缓冲区 (Read-Only Buffers):

    • 通过 原buffer.asReadOnlyBuffer() 创建。它也是共享底层数据的,但是如果你尝试对这个只读 Buffer 调用任何 put 方法,它会抛出 ReadOnlyBufferException 异常。这是一种保护机制,用来安全地传递数据而不允许接收方修改它。
  8. ByteBuffer 和 CharBuffer 的特性:

    • ByteBuffer:非常基础和常用。它可以直接存取其他基本类型的数据(如 putInt(), getInt(), putDouble(), getDouble() 等)。你需要注意字节序(大端/小端),不过通常默认设置(大端)能满足大部分需求。
    • CharBuffer:专门用于处理字符数据。可以直接存取 String (put("字符串"), wrap("字符串"))。它实现了 CharSequence 接口,所以可以用很多类似 String 的方法,比如 charAt(), subSequence(),甚至可以直接打印 CharBuffer 对象(会自动调用 toString() 输出内容)。
  9. 直接缓冲区 (Direct Buffers):

    • 创建: 使用 ByteBuffer.allocateDirect(capacity)
    • 是什么: 它申请的内存不在 Java 的常规堆内存(Heap)中,而是在堆外的本地内存(Native Memory)中。
    • 为什么用: 对于某些底层的 IO 操作(如网络发送、文件读写),操作系统可能可以直接操作这块本地内存,避免了一次从 Java 堆内存到本地内存的数据复制,从而可能提高性能
    • 工作原理 (简化版): 内部使用了 Java 的一些底层、非标准的机制(如 sun.misc.Unsafe 类)来直接向操作系统申请和释放内存。
    • 垃圾回收: 因为内存不在 Java 堆上,普通的 GC 无法直接管理。NIO 使用了一种叫做 Cleaner 的机制,它与 Java 的虚引用(PhantomReference) 配合工作。当创建 DirectByteBuffer 的 Java 对象不再被任何地方引用,即将被 GC 回收时,Cleaner 会被触发,进而调用底层的 Unsafe 方法来释放之前申请的那块本地内存,防止内存泄漏。
    • 共享与 att 字段: 当你对一个 DirectByteBuffer 进行 duplicate()slice() 操作时,产生的新的 Buffer 对象同样共享那块本地内存。文档中提到的 att (attachment) 字段是一个内部实现细节,它的作用是让复制或切分出来的 Buffer 对象持有一个对原始 Buffer 对象(或者说持有对管理那块内存的对象的引用),确保即使原始的 Buffer 对象看似没有被引用了,只要还有复制品或切片在用,那块本地内存就不会被过早释放

Channel (通道)

你可以把 Channel 想象成连接数据源/目的地(比如文件、网络连接)和 Buffer(桶) 之间的管道。数据总是通过 Channel 从源流入 Buffer,或者从 Buffer 通过 Channel 流出到目的地。Channel 代表了与能够进行 IO 操作的实体(文件、套接字等)的连接。

核心概念详解:

  1. Channel vs. Stream (流):

    • 方向: Stream 通常是单向的(InputStream 只读,OutputStream 只写)。Channel 通常是双向的(比如 SocketChannel 既可读也可写),但也有单向的(如 FileChannelFileInputStream 获取时只读)。
    • 数据单元: Stream 直接操作字节或字符。Channel 总是Buffer 配合使用,数据必须先读入 Buffer 或从 Buffer 写入。
  2. 主要的接口:

    • Channel: 最基础的接口,定义了 isOpen() (通道是否打开) 和 close() (关闭通道)。
    • ReadableByteChannel: 定义了 int read(ByteBuffer dst) 方法,从通道读取数据到目标 ByteBuffer。
    • WritableByteChannel: 定义了 int write(ByteBuffer src) 方法,将源 ByteBuffer 中的数据写入通道。
    • ByteChannel: 同时继承了 ReadableByteChannelWritableByteChannel,表示可读写的字节通道。
    • SeekableByteChannel: 针对可以定位读写位置的通道(主要是文件)。增加了 position() (获取或设置当前位置), size() (获取实体大小), truncate(long size) (截断实体到指定大小) 等方法。FileChannel 实现了这个接口。
    • InterruptibleChannel: 允许在一个线程阻塞于此通道的 IO 操作时,另一个线程可以通过调用 close() 方法来中断那个阻塞的线程(阻塞的线程会抛出 AsynchronousCloseException)。
  3. FileChannel (文件通道): 用于文件 IO。

    • 获取方式:
      • 通过 FileInputStreamFileOutputStreamgetChannel() 方法获取。这种方式得到的 FileChannel 会继承流的只读或只写属性。
      • 通过 RandomAccessFilegetChannel() 方法获取。如果 RandomAccessFile 是以 "rw" (读写) 模式打开的,那么得到的 FileChannel 就既可读又可写,这是最常用的方式。
    • 常用操作:
      • read(ByteBuffer dst): 从文件的当前位置读取数据到 buffer。
      • write(ByteBuffer src): 将 buffer 中的数据写入文件的当前位置。
      • position() / position(long newPos): 获取或设置文件内的读写指针位置。
      • size(): 获取文件当前的大小。
      • truncate(long size): 将文件截断到指定的大小(超出部分被删除)。
      • transferTo(long pos, long count, WritableByteChannel target): 高效地将本文件通道的数据(从 pos 开始,最多 count 字节)传输到目标通道。
      • transferFrom(ReadableByteChannel src, long pos, long count): 高效地源通道读取数据(最多 count 字节)并写入本文件通道的 pos 位置。这两个传输方法可能利用操作系统的“零拷贝”特性,非常高效。
      • map(MapMode mode, long position, long size): 内存映射。将文件的一部分(从 position 开始,大小为 size)直接映射到内存中,返回一个 MappedByteBuffer(它是一种 DirectByteBuffer)。你可以像操作内存数组一样操作这个 Buffer。如果 modeREAD_WRITE,你对 Buffer 的修改最终会反映到磁盘文件上(但不一定是实时的)。调用 buffer.force() 可以强制将内存中的修改同步到磁盘。
  4. FileLock (文件锁):

    • 用途: 控制不同进程(独立运行的程序)之间对同一个文件(或文件的一部分)的访问。不是用来控制同一个程序内部多个线程访问文件的。主要目的是防止多个程序同时修改文件导致数据损坏。
    • 获取锁:
      • channel.lock(position, size, shared): 尝试获取锁。如果锁已被其他进程以冲突的方式持有,这个方法会阻塞,直到能获取锁为止。
      • channel.tryLock(position, size, shared): 尝试获取锁,但不阻塞。如果能立刻获得锁,就返回 FileLock 对象;如果不能(比如已被其他进程锁定),就立刻返回 null
    • 锁的类型:
      • 独占锁 (Exclusive Lock, shared = false): 同一时间,文件的某个区域只能被一个进程持有独占锁。持有者可以对该区域进行读写操作。其他任何进程都不能再对该区域获取任何锁(无论是独占还是共享)。
      • 共享锁 (Shared Lock, shared = true): 同一时间,文件的某个区域可以被多个进程同时持有共享锁。持有者只能进行读操作,不能写。它可以阻止其他进程获取该区域的独占锁,但不阻止其他进程获取共享锁
    • 注意事项: 文件锁在很多操作系统上是“建议性锁”,意味着如果程序不主动检查和遵守锁规则,它仍然可能绕过锁去访问文件。使用完锁后,必须调用 lock.release() 释放锁,最好放在 finally 块中确保执行。要锁定整个文件,通常使用 channel.lock(0, Long.MAX_VALUE, shared)。如果两个锁请求的区域有重叠,通常是不允许的(即使一个是共享,一个是独占)。

Selector (选择器) 与网络通信 (多路复用)

这是 NIO 在网络编程方面的核心优势所在,它让一个(或少数几个)线程能够高效地管理大量的网络连接。

核心概念详解:

  1. 传统阻塞 IO 的问题: 在网络编程中,传统的 ServerSocket.accept() (等待客户端连接) 和 Socket.read() (等待客户端发数据) 都是阻塞的。如果服务器要同时处理很多客户端,一种常见做法是为每个客户端连接创建一个单独的线程。当客户端数量非常多时,即使很多客户端大部分时间是空闲的(没发数据),也需要保持大量的线程,这会消耗大量的内存和 CPU 资源,导致服务器性能下降或崩溃。

  2. NIO 的非阻塞方案:

    • 设置非阻塞模式: 首先,需要将网络相关的 Channel(ServerSocketChannel, SocketChannel)设置为非阻塞模式:channel.configureBlocking(false)
    • 非阻塞行为: 设置后,accept()read() 方法会立即返回
      • accept(): 如果正好有客户端连接请求进来,它返回代表这个连接的 SocketChannel;如果没有,它返回 null
      • read(): 如果通道里有数据可读,它读取数据并返回读取的字节数;如果没有数据可读,它返回 0;如果连接已关闭,它返回 -1。
    • 新问题: 虽然不阻塞了,但程序怎么知道什么时候去调用 accept()read() 才能拿到有效连接或数据呢?不停地循环去问(轮询)会浪费 CPU。这时就需要 Selector
  3. Selector (选择器):

    • 比喻: 可以把它想象成一个事件通知中心、一个机场的塔台、或者一个能同时监控很多管道的看门人
    • 注册 (Register): 你需要把你的非阻塞 Channel "注册" 到 Selector 上,并且告诉 Selector 你对这个 Channel 的哪些事件感兴趣(比如,“当这个 ServerSocketChannel 可以接受新连接时通知我”,或者“当这个 SocketChannel 有数据可读时通知我”)。这些感兴趣的事件用常量表示:
      • SelectionKey.OP_ACCEPT: 对 ServerSocketChannel 有效,表示可以接受(accept)一个新的连接了。
      • SelectionKey.OP_CONNECT: 对 SocketChannel 有效(客户端),表示连接操作(connect)完成了。
      • SelectionKey.OP_READ: 对 SocketChannel 有效,表示通道里有数据可以读取(read)了。
      • SelectionKey.OP_WRITE: 对 SocketChannel 有效,表示可以向通道写入(write)数据了。(这个事件比较特殊,因为通常通道都是可写的,所以如果一直监听它,可能会导致 select() 持续返回,需要谨慎使用)。
    • 选择 (Select): selector.select() 是核心方法。它会阻塞在那里(或者阻塞一个设定的超时时间),直到至少有一个你注册过的 Channel 准备好了你感兴趣的事件。当有事件发生时,select() 方法会返回,并告诉你有多少个 Channel 已经就绪。
    • 获取就绪的 Key:select() 返回大于 0 的值时,你需要调用 selector.selectedKeys()。这会返回一个 Set<SelectionKey> 集合,集合里的每一个 SelectionKey 都代表一个已经就绪的 Channel。
    • SelectionKey: 这是 Channel、Selector 和事件之间的纽带。一个 SelectionKey 对象包含了:
      • 对应的 Channel (key.channel())。
      • 对应的 Selector (key.selector())。
      • 你当初注册时感兴趣的事件集合 (key.interestOps())。
      • 当前 Channel 实际已经就绪的事件集合 (key.readyOps())。你可以通过 key.isAcceptable(), key.isReadable(), key.isWritable(), key.isConnectable() 来检查具体是哪个事件就绪了。
      • 一个可选的附加对象 (key.attach(Object obj), key.attachment())。这个非常有用,你可以把与这个 Channel 相关的数据(比如用户会话信息、处理这个 Channel 的 Handler 对象等)附加到 Key 上,方便在处理事件时获取。
  4. 事件循环 (Event Loop): 使用 Selector 的典型编程模式通常是一个无限循环:

    Selector selector = Selector.open();
    // ... 把 ServerSocketChannel 注册到 selector 上,监听 OP_ACCEPT ...
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    
    while (true) {
        // 1. 等待事件发生
        int readyChannels = selector.select();
        if (readyChannels == 0) {
            continue; // 可能被唤醒或超时,但没事件,继续等
        }
    
        // 2. 获取所有就绪事件的 SelectionKey
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
    
        while (keyIterator.hasNext()) {
            SelectionKey key = keyIterator.next();
    
            // 3. 根据就绪的事件类型,进行处理
            if (key.isAcceptable()) {
                // 处理新连接请求:
                // ServerSocketChannel server = (ServerSocketChannel) key.channel();
                // SocketChannel client = server.accept();
                // 把 client 设为非阻塞
                // 把 client 注册到 selector 上,监听 OP_READ (通常)
            } else if (key.isReadable()) {
                // 处理客户端发来的数据:
                // SocketChannel client = (SocketChannel) key.channel();
                // ByteBuffer buffer = ...;
                // client.read(buffer);
                // 处理 buffer 中的数据...
            } else if (key.isWritable()) {
                // 处理写就绪(如果需要):
                // SocketChannel client = (SocketChannel) key.channel();
                // ByteBuffer buffer = ...; // 准备要写的数据
                // client.write(buffer);
            }
    
            // 4. *** 非常重要:处理完一个 Key 后,必须从 selectedKeys 集合中移除它!***
            keyIterator.remove();
        }
    }
    
    • 为什么必须 keyIterator.remove() select() 方法只是把就绪的 Key 添加selectedKeys 集合中,它不会自动移除。如果你处理完一个事件后不把它从集合里移除,下一次 select() 返回时(即使这个 Channel 没有新的事件),这个 Key 仍然会在 selectedKeys 集合里,导致你重复处理同一个(可能已经处理过的)事件,引发逻辑错误。
  5. 底层的 IO 多路复用模型: 文档提到了 select, poll, epoll。这些是操作系统层面实现 IO 多路复用的技术。Java 的 Selector 在不同操作系统上会使用相应最高效的底层实现(比如在 Linux 上优先使用 epoll)。epollselectpoll 更高效,因为它能精确地告诉你哪些连接就绪了(事件驱动,复杂度 O(1)),而 selectpoll 需要遍历所有监听的连接来检查状态(复杂度 O(n))。Java NIO 的 Selector 帮我们屏蔽了这些底层差异。

  6. Reactor 模式: 这是一种常用的设计模式,用来组织基于 Selector 的网络服务器程序结构,目的是分离职责,提高可维护性和可扩展性。

    • 核心组件:
      • Reactor: 通常在一个或多个专用线程中运行,负责监听事件 (selector.select()),并将就绪的事件**分发(dispatch)**给对应的 Handler 处理。
      • Handler/Acceptor: 负责处理具体的事件。Acceptor 通常专门处理连接建立事件 (OP_ACCEPT),Handler 处理读写事件 (OP_READ, OP_WRITE) 和相关的业务逻辑。
    • 几种变体:
      • 单线程 Reactor: 一个线程负责 select() 循环,并且直接在该线程中执行所有的 Handler(包括 Acceptor 和读写 Handler)。实现简单,但在 Handler 处理耗时较长时会阻塞整个 Reactor。 (文档中第一个 Reactor 示例)
      • 多线程 Reactor (使用 Worker 线程池): Reactor 线程只负责 select() 和基本的 IO 操作(比如 accept()read() 到 Buffer),然后将包含数据的 Buffer 或任务提交给一个独立的 Worker 线程池去执行耗时的业务逻辑处理和响应写入。这样 Reactor 线程能快速返回继续监听事件,提高了并发处理能力。 (文档中第二个 Reactor 示例)
      • 主从 Reactor (Main-Sub Reactor):
        • Main Reactor (通常一个线程): 只负责监听和处理 ServerSocketChannel 上的 OP_ACCEPT 事件。当接受一个新连接 (SocketChannel) 后,它负责这个连接后续的读写。
        • Sub Reactor (通常多个线程,比如每个 CPU 核心一个): 每个 Sub Reactor 都有自己独立Selector,负责处理一部分客户端连接 (SocketChannel) 上的 OP_READOP_WRITE 事件。
        • 工作流程: Main Reactor 接受新连接后,通过某种策略(如轮询)选择一个 Sub Reactor,并将这个新的 SocketChannel 注册到那个 Sub Reactor 的 Selector 上,监听 OP_READ 事件。这样,处理连接建立和处理数据读写的负载就被分散开了。 (文档中最后一个 Reactor 示例)
        • selector.wakeup():当 Main Reactor 线程要把新接受的 SocketChannel 注册到某个 Sub Reactor 的 Selector 上时,因为注册操作发生在 Main Reactor 线程,而 Sub Reactor 的 select() 可能正在其自己的线程中阻塞等待。为了确保 Sub Reactor 能及时发现这个新注册的 Channel,需要在调用 register() 之前,调用一下 Sub Reactor 的 selector.wakeup() 方法。这会中断 select() 的阻塞,让它重新检查注册的 Channel。

总结一下:

对于 I/O 模型之间的对比&代码分析

理解 BIO、NIO 和 AIO 的区别。

NIO 相关代码分析

以一个简单的“服务器接收客户端消息并回显”的场景为例。

场景: 客户端连接服务器,发送一行文本,服务器收到后加上"Server received: "前缀再发回给客户端。


1. BIO (同步阻塞 IO) - 代码与分析

核心特点: 一个连接一个线程,API 调用阻塞。

// BioServer.java
import java.io.*;
import java.net.*;
import java.util.concurrent.*;

public class BioServer {
    public static void main(String[] args) throws IOException {
        // 1. 创建一个线程池来处理客户端连接,避免无限创建线程
        //    虽然用了线程池,但本质还是一个连接对应一个活跃(或等待)线程
        ExecutorService threadPool = Executors.newCachedThreadPool();
        ServerSocket serverSocket = null;

        try {
            // 2. 创建 ServerSocket,监听 8080 端口
            serverSocket = new ServerSocket(8080);
            System.out.println("BIO Server started on port 8080, waiting for connections...");

            while (true) { // 主线程循环,持续接受新连接
                // 3. serverSocket.accept() 是一个【阻塞】方法
                //    如果没有客户端连接,线程会一直卡在这里等待
                final Socket clientSocket = serverSocket.accept();
                System.out.println("Client connected: " + clientSocket.getRemoteSocketAddress());

                // 4. 当有新连接进来时,将处理任务交给线程池中的一个线程
                threadPool.execute(() -> {
                    handleClient(clientSocket); // 调用处理客户端逻辑的方法
                });
            }
        } finally {
            if (serverSocket != null) {
                serverSocket.close();
            }
            threadPool.shutdown(); // 关闭线程池
        }
    }

    // 处理单个客户端连接的方法
    private static void handleClient(Socket clientSocket) {
        // 5. 使用 try-with-resources 确保流和 Socket 能被正确关闭
        try (InputStream in = clientSocket.getInputStream();
             OutputStream out = clientSocket.getOutputStream();
             // 6. 使用 BufferedReader 和 PrintWriter 简化文本读写
             BufferedReader reader = new BufferedReader(new InputStreamReader(in));
             PrintWriter writer = new PrintWriter(out, true)) // true 表示自动 flush
        {
            String line;
            // 7. reader.readLine() 是一个【阻塞】方法
            //    如果客户端没有发送数据(或者没发送换行符),线程会卡在这里等待
            while ((line = reader.readLine()) != null) {
                System.out.println("Received from " + clientSocket.getRemoteSocketAddress() + ": " + line);
                // 8. writer.println() 将数据写入缓冲区,可能也会阻塞(如果缓冲区满)
                writer.println("Server received: " + line);
            }
            // 9. 当 readLine() 返回 null 时,表示客户端断开了连接(输入流结束)
            System.out.println("Client disconnected: " + clientSocket.getRemoteSocketAddress());

        } catch (IOException e) {
            // 10. 处理读写过程中可能发生的异常
            System.err.println("Error handling client " + clientSocket.getRemoteSocketAddress() + ": " + e.getMessage());
        }
        // try-with-resources 会自动关闭 clientSocket
    }
}

// BioClient.java (用于测试)
import java.io.*;
import java.net.*;
import java.util.Scanner;

public class BioClient {
    public static void main(String[] args) {
        try (Socket socket = new Socket("localhost", 8080); // 连接服务器
             BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 读取服务器响应
             PrintWriter writer = new PrintWriter(socket.getOutputStream(), true); // 向服务器发送数据
             Scanner scanner = new Scanner(System.in)) // 读取用户输入
        {
            System.out.println("Connected to server. Enter messages (or 'quit' to exit):");
            String userInput;
            while (!(userInput = scanner.nextLine()).equalsIgnoreCase("quit")) {
                writer.println(userInput); // 发送用户输入到服务器
                String serverResponse = reader.readLine(); // 【阻塞】等待服务器响应
                System.out.println("Server response: " + serverResponse);
            }
        } catch (IOException e) {
            System.err.println("Client Error: " + e.getMessage());
        }
        System.out.println("Client disconnected.");
    }
}

BIO 代码分析:


2. NIO (同步非阻塞 IO) - 代码与分析

核心特点: 基于 Selector 的事件驱动,一个线程管理多个连接,非阻塞 API。

// NioServer.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

public class NioServer {
    public static void main(String[] args) throws IOException {
        // 1. 创建 Selector,用于注册和监听 Channel 事件
        Selector selector = Selector.open();

        // 2. 创建 ServerSocketChannel,作为服务器监听端口
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 3. 绑定端口
        serverChannel.bind(new InetSocketAddress(8080));
        // 4. 【关键】设置为非阻塞模式,这样 accept() 不会阻塞
        serverChannel.configureBlocking(false);

        // 5. 【关键】将 ServerSocketChannel 注册到 Selector,
        //    并指定感兴趣的事件是 OP_ACCEPT (接受连接事件)
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("NIO Server started on port 8080, waiting for events...");

        while (true) { // 事件循环 (Event Loop)
            // 6. 【关键】selector.select() 是一个【阻塞】方法 (但可以设置超时)
            //    它会阻塞,直到至少有一个注册的 Channel 准备好进行所关注的 IO 操作
            //    或者超时、或者被 selector.wakeup() 唤醒
            if (selector.select() == 0) {
                // select() 返回 0 表示没有 Channel 就绪 (例如超时或被唤醒)
                continue;
            }

            // 7. 获取所有已就绪事件的 SelectionKey 集合
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            // 8. 遍历就绪的 Key
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();

                // 9. 根据 Key 的类型判断是哪种事件就绪
                if (key.isAcceptable()) {
                    // a. 处理接受连接事件
                    handleAccept(key, selector);
                } else if (key.isReadable()) {
                    // b. 处理读数据事件
                    handleRead(key);
                }
                // ... 可以添加 isWritable() 的处理逻辑 ...

                // 10. 【非常重要】处理完一个 Key 后,必须将其从 selectedKeys 集合中移除
                //     否则下次 select() 返回时,这个 Key 还在集合里,会重复处理
                keyIterator.remove();
            }
        }
        // 注意:实际应用中需要添加关闭 Selector 和 Channel 的逻辑
    }

    // 处理接受连接事件的方法
    private static void handleAccept(SelectionKey key, Selector selector) throws IOException {
        // 11. 从 Key 中获取关联的 ServerSocketChannel
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // 12. 调用 accept() 接受连接,因为是非阻塞模式,会立刻返回
        //     如果有连接则返回 SocketChannel,没有则返回 null
        SocketChannel clientChannel = server.accept();
        if (clientChannel != null) {
            // 13. 【关键】将新接受的 SocketChannel 也设置为非阻塞模式
            clientChannel.configureBlocking(false);
            // 14. 【关键】将这个新的 SocketChannel 注册到【同一个】Selector 上
            //     并指定感兴趣的事件是 OP_READ (读取数据事件)
            //     可以将 ByteBuffer 作为附件关联到 Key 上,方便后续读写
            clientChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
            System.out.println("Client connected: " + clientChannel.getRemoteAddress());
        }
    }

    // 处理读数据事件的方法
    private static void handleRead(SelectionKey key) throws IOException {
        // 15. 从 Key 中获取关联的 SocketChannel
        SocketChannel clientChannel = (SocketChannel) key.channel();
        // 16. 从 Key 的附件中获取之前分配的 ByteBuffer (或者在这里创建)
        ByteBuffer buffer = (ByteBuffer) key.attachment();

        try {
            // 17. 【关键】调用 read() 读取数据,因为是非阻塞模式,会立刻返回
            //     返回值 > 0: 读取到数据
            //     返回值 = 0: 当前没有数据可读
            //     返回值 = -1: 连接已关闭
            int bytesRead = clientChannel.read(buffer);

            if (bytesRead > 0) {
                // 18. 读取到数据,切换 Buffer 为读模式
                buffer.flip();
                // 19. 解码 Buffer 中的字节为字符串
                String receivedMessage = StandardCharsets.UTF_8.decode(buffer).toString().trim();
                System.out.println("Received from " + clientChannel.getRemoteAddress() + ": " + receivedMessage);

                // 20. 准备回显的响应数据
                String responseMessage = "Server received: " + receivedMessage;
                ByteBuffer writeBuffer = StandardCharsets.UTF_8.encode(responseMessage);

                // 21. 【关键】调用 write() 写回数据,也是非阻塞的
                //     注意:write() 不保证一次性写完所有数据,可能需要处理 "写半包"
                //     简单起见,这里假设能一次写完。复杂场景需要配合 OP_WRITE 事件。
                clientChannel.write(writeBuffer);

                // 22. 清空或压缩读 Buffer,为下次读取做准备
                // buffer.clear(); // 或者 buffer.compact();
                buffer.clear(); // 这里简单清空

            } else if (bytesRead == -1) {
                // 23. 客户端关闭连接
                System.out.println("Client disconnected: " + clientChannel.getRemoteAddress());
                key.cancel(); // 从 Selector 中注销该 Channel
                clientChannel.close(); // 关闭 Channel
            }
            // bytesRead == 0 的情况不用处理,等待下次 select() 通知即可

        } catch (IOException e) {
            // 24. 发生 IO 异常,通常也表示连接出问题
            System.err.println("Error handling read/write for " + clientChannel.getRemoteAddress() + ": " + e.getMessage());
            key.cancel();
            clientChannel.close();
        }
    }
}
// NIO Client 可以使用上面的 BioClient 进行测试,因为客户端行为是一样的。

NIO 代码分析:


3. AIO (异步非阻塞 IO) - 代码与分析

核心特点: 应用发起 IO 操作后不等待,由操作系统完成 IO 后回调通知应用。真正的异步。

// AioServer.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AioServer {

    private AsynchronousServerSocketChannel serverChannel;
    private ExecutorService executorService; // 用于执行回调的线程池

    public void start(int port) throws IOException {
        executorService = Executors.newCachedThreadPool();
        // 1. 创建 AsynchronousChannelGroup,管理共享资源,如线程池
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);

        // 2. 创建 AsynchronousServerSocketChannel
        serverChannel = AsynchronousServerSocketChannel.open(group);
        // 设置一些选项 (可选)
        serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        // 3. 绑定端口
        serverChannel.bind(new InetSocketAddress(port));

        System.out.println("AIO Server started on port " + port + ", waiting for connections...");

        // 4. 【关键】发起【异步】accept 操作
        //    第一个参数 attachment: 可以在回调时传递的附加对象,这里是 null
        //    第二个参数 handler: 操作完成(成功或失败)后的回调处理器
        //    这个 accept() 方法会【立刻返回】,不会阻塞
        serverChannel.accept(null, new AcceptCompletionHandler());
    }

    // 接受连接的回调处理器
    private class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {

        @Override
        public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
            // 5. 【回调】当一个连接成功接受时,这个方法会被【某个线程】调用
            try {
                System.out.println("Client connected: " + clientChannel.getRemoteAddress());
            } catch (IOException e) {
                System.err.println("Error getting remote address: " + e.getMessage());
            }

            // 6. 【关键】为了能持续接受下一个连接,必须再次调用 accept
            //    将 this (当前 Handler 对象) 作为 handler,形成链式调用
            serverChannel.accept(null, this);

            // 7. 为新连接准备读操作
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            // 8. 创建一个包含 Channel 和 Buffer 的上下文对象,方便在回调中传递
            ClientContext context = new ClientContext(clientChannel, buffer);
            // 9. 【关键】发起【异步】read 操作
            //    将 buffer 和 context 作为 attachment 传递给回调
            //    这个 read() 方法也会【立刻返回】
            clientChannel.read(buffer, context, new ReadCompletionHandler());
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            // 10. 【回调】当接受连接操作失败时,这个方法会被调用
            System.err.println("Accept failed: " + exc.getMessage());
            // 可能需要记录日志或尝试重新启动监听
        }
    }

    // 客户端读写上下文
    private static class ClientContext {
        AsynchronousSocketChannel channel;
        ByteBuffer buffer;
        ClientContext(AsynchronousSocketChannel channel, ByteBuffer buffer) {
            this.channel = channel;
            this.buffer = buffer;
        }
    }

    // 读取数据的回调处理器
    private class ReadCompletionHandler implements CompletionHandler<Integer, ClientContext> {

        @Override
        public void completed(Integer bytesRead, ClientContext context) {
            // 11. 【回调】当读取操作成功完成时,这个方法会被调用
            //     bytesRead: 实际读取到的字节数 (-1 表示连接关闭)
            //     context: 发起 read 时传入的上下文对象

            if (bytesRead > 0) {
                // 12. 读取到数据,切换 Buffer 为读模式
                context.buffer.flip();
                String receivedMessage = StandardCharsets.UTF_8.decode(context.buffer).toString().trim();
                try {
                    System.out.println("Received from " + context.channel.getRemoteAddress() + ": " + receivedMessage);
                } catch (IOException e) { /* ignore */ }

                // 13. 准备回显数据
                String responseMessage = "Server received: " + receivedMessage;
                ByteBuffer writeBuffer = StandardCharsets.UTF_8.encode(responseMessage);

                // 14. 【关键】发起【异步】write 操作
                //     将 writeBuffer 和 context 作为 attachment 传递
                context.channel.write(writeBuffer, context, new WriteCompletionHandler());

                // 15. 清空读 Buffer 为下次读做准备 (放在这里或 Write 完成后都可以)
                context.buffer.clear();

            } else if (bytesRead == -1) {
                // 16. 客户端关闭连接
                handleDisconnection(context.channel);
            }
            // bytesRead == 0: 异步模型通常不会是 0,除非 buffer 满了?(一般不用特别处理)
        }

        @Override
        public void failed(Throwable exc, ClientContext context) {
            // 17. 【回调】当读取操作失败时,这个方法会被调用
            handleFailure("Read failed", exc, context.channel);
        }
    }

    // 写入数据的回调处理器
    private class WriteCompletionHandler implements CompletionHandler<Integer, ClientContext> {

        @Override
        public void completed(Integer bytesWritten, ClientContext context) {
            // 18. 【回调】当写入操作成功完成时,这个方法会被调用
            //     注意:bytesWritten 可能小于要写入的总字节数,表示只写了一部分

            // 19. 获取 write 时传入的 ByteBuffer (它可能还在 context 里,或者就是传入的 writeBuffer)
            //     (这里假设 Write 时传入的是待写入的 buffer,需要从 attachment 获取)
            //     为了简化,我们假设 write 时传入的 attachment 是包含 write buffer 的对象,
            //     或者像 read 那样,write 时第一个参数就是 buffer,第二个是 context
            //     我们直接依赖 Read Handler 中发起的 Write,并在完成后准备下次 Read

            // 20. 写入完成后,发起下一次【异步】read 操作,形成闭环
            //     注意:复杂应用可能需要检查是否完全写入,如果没写完要继续 write
            context.channel.read(context.buffer, context, new ReadCompletionHandler());
        }

        @Override
        public void failed(Throwable exc, ClientContext context) {
            // 21. 【回调】当写入操作失败时,这个方法会被调用
            handleFailure("Write failed", exc, context.channel);
        }
    }

    // 统一处理失败和断开连接
    private void handleFailure(String message, Throwable exc, AsynchronousSocketChannel channel) {
        try {
            System.err.println(message + " for " + channel.getRemoteAddress() + ": " + exc.getMessage());
            if (channel.isOpen()) channel.close();
        } catch (IOException e) { /* ignore close exception */ }
    }
    private void handleDisconnection(AsynchronousSocketChannel channel) {
        try {
            System.out.println("Client disconnected: " + channel.getRemoteAddress());
            if (channel.isOpen()) channel.close();
        } catch (IOException e) { /* ignore close exception */ }
    }

    public void stop() {
        try {
            if (serverChannel != null) serverChannel.close();
        } catch (IOException e) { /* ignore */ }
        if (executorService != null) executorService.shutdown();
        try {
            if (executorService != null && !executorService.awaitTermination(5, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
        System.out.println("AIO Server stopped.");
    }


    public static void main(String[] args) {
        AioServer server = new AioServer();
        try {
            server.start(8080);
            // 让主线程保持运行,否则程序会退出
            Thread.currentThread().join();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            server.stop();
        }
    }
}
// AIO Client 可以使用上面的 BioClient 进行测试

AIO 代码分析:


IO 多路复用 (技术层面):

IO 多路复用本身是一种操作系统技术,没有直接对应的 Java 代码“模型”像 BIO/NIO/AIO 那样。但 NIO 的 Selector 就是对 IO 多路复用技术的 Java 封装。因此,NIO Server 的代码示例实际上就展示了如何在 Java 中使用 IO 多路复用技术

关键点在于:

  1. 注册多个 Channel 到一个 Selector: serverChannel.register(selector, ...)clientChannel.register(selector, ...)
  2. 调用 select() 阻塞等待: selector.select() 这一步,底层就是依赖操作系统的 select/poll/epoll 等机制来等待多个 Channel 中的任意一个就绪。
  3. 处理就绪的 Channel: 遍历 selectedKeys 并对每个 key 对应的 Channel 执行非阻塞的 IO 操作。

所以,理解了 NIO Server 的代码,尤其是 Selector 的使用,你就理解了如何在 Java 应用层面利用 IO 多路复用技术。

附录

单线程 Reactor 模式

../../../../ZZZ-Misc/Z-Attachment/images/Pasted image 20250419005045.png

graph TD  
    subgraph Single Thread  
        R[Reactor] -- 1.register --> S(Selector)  
        C1[Channel 1 ServerSocket] -- OP_ACCEPT --> S  
        C2[Channel 2 Socket] -- OP_READ/WRITE --> S  
        C3[Channel 3 Socket] -- OP_READ/WRITE --> S  

        R -- 2.select() --> S  
        S -- 3.notifies SelectionKeys --> R  

        R -- 4.dispatch Accept --> H_Accept(Acceptor Handler)  
        R -- 4.dispatch Read/Write --> H_RW(Read/Write Handler)  

        H_Accept -- 5.performs accept & registers new channel --> S  
        H_RW -- 5.performs read/write --> C2/C3  
    end  

    style R fill:#f9f,stroke:#333,stroke-width:2px  
    style S fill:#ccf,stroke:#333,stroke-width:1px  
    style H_Accept fill:#9cf,stroke:#333,stroke-width:1px  
    style H_RW fill:#9fc,stroke:#333,stroke-width:1px

多线程 Reactor (Worker Pool)
../../../../ZZZ-Misc/Z-Attachment/images/Pasted image 20250419005056.png

graph TD  
    subgraph Reactor Thread  
        R[Reactor] -- 1.register --> S(Selector)  
        C1[Channel 1 ServerSocket] -- OP_ACCEPT --> S  
        C2[Channel 2 Socket] -- OP_READ --> S  
        C3[Channel 3 Socket] -- OP_READ --> S  

        R -- 2.select() --> S  
        S -- 3.notifies SelectionKeys --> R  

        R -- 4.dispatch Accept --> H_Accept(Acceptor Handler)  
        R -- 4.dispatches Read IO Only --> H_IO(IO Handler Reader)  

        H_Accept -- 5.performs accept & registers new channel --> S  
        H_IO -- 5.reads data --> Buffer[ByteBuffer]  
        H_IO -- 6.submits task --> WP(Worker Thread Pool)  

    end  

    subgraph Worker Threads  
      WP -- 7.assigns task --> T1(Handler Thread 1)  
      WP -- 7.assigns task --> T2(Handler Thread 2)  
      T1 -- 8.processes Buffer & prepares response --> Resp1[Response Buffer]  
      T2 -- 8.processes Buffer & prepares response --> Resp2[Response Buffer]  
      T1 -- 9.May schedule write back via Reactor/Selector --> R  
      T2 -- 9.May schedule write back via Reactor/Selector --> R  
    end  

    style R fill:#f9f,stroke:#333,stroke-width:2px  
    style S fill:#ccf,stroke:#333,stroke-width:1px  
    style H_Accept fill:#9cf,stroke:#333,stroke-width:1px  
    style H_IO fill:#9fc,stroke:#333,stroke-width:1px  
    style WP fill:#fec,stroke:#333,stroke-width:2px

主从 Reactor (Main-Sub Reactor)

../../../../ZZZ-Misc/Z-Attachment/images/Pasted image 20250419005125.png

graph TD
    subgraph Main Reactor Thread
        MR[Main Reactor] -- 1.register --> MS(Main Selector)
        SSC[ServerSocketChannel] -- OP_ACCEPT --> MS

        MR -- 2.select() --> MS
        MS -- 3.notifies Accept --> MR
        MR -- 4.dispatches Accept --> MA(Acceptor)
        MA -- 5.performs accept --> NewSC(New SocketChannel)
        MA -- 6.selects SubReactor & registers NewSC --> SR1_S(SubReactor 1 Selector)
        MA -- 6.selects SubReactor & registers NewSC --> SR2_S(SubReactor 2 Selector)
    end

    subgraph Sub Reactor Thread 1
        SR1[SubReactor 1] -- register --> SR1_S
        C1[SocketChannel 1] -- OP_READ/WRITE --> SR1_S
        C3[SocketChannel 3] -- OP_READ/WRITE --> SR1_S
        SR1 -- select() --> SR1_S
        SR1_S -- notifies Read/Write --> SR1
        SR1 -- dispatches Read/Write --> H_RW1(Read/Write Handler 1)
        H_RW1 -- performs IO & processing --> C1/C3
    end

    subgraph Sub Reactor Thread 2
        SR2[SubReactor 2] -- register --> SR2_S
        C2[SocketChannel 2] -- OP_READ/WRITE --> SR2_S
        C4[SocketChannel 4] -- OP_READ/WRITE --> SR2_S
        SR2 -- select() --> SR2_S
        SR2_S -- notifies Read/Write --> SR2
        SR2 -- dispatches Read/Write --> H_RW2(Read/Write Handler 2)
        H_RW2 -- performs IO & processing --> C2/C4
    end

    style MR fill:#f9f,stroke:#333,stroke-width:2px
    style MS fill:#ccf,stroke:#333,stroke-width:1px
    style MA fill:#9cf,stroke:#333,stroke-width:1px
    style SR1 fill:#f9f,stroke:#333,stroke-width:2px
    style SR2 fill:#f9f,stroke:#333,stroke-width:2px
    style SR1_S fill:#ccf,stroke:#333,stroke-width:1px
    style SR2_S fill:#ccf,stroke:#333,stroke-width:1px
    style H_RW1 fill:#9fc,stroke:#333,stroke-width:1px
    style H_RW2 fill:#9fc,stroke:#333,stroke-width:1px